package com.ld.shieldsb.canalclient.handler.impl;

import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.google.common.collect.ImmutableMap;
import com.ld.shieldsb.canalclient.etl.EtlConsumer;
import com.ld.shieldsb.canalclient.etl.EtlServiceUtil;
import com.ld.shieldsb.canalclient.handler.ICanalDataSyncHandler;
import com.ld.shieldsb.canalclient.handler.config.AdapterConfig;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig.DbMapping;
import com.ld.shieldsb.canalclient.handler.config.OuterAdapterConfig;
import com.ld.shieldsb.canalclient.handler.impl.db.RdbEtlService;
import com.ld.shieldsb.canalclient.handler.impl.db.RdbSyncService;
import com.ld.shieldsb.canalclient.model.Dml;
import com.ld.shieldsb.canalclient.model.EtlConditions;
import com.ld.shieldsb.canalclient.recoder.Recorder;
import com.ld.shieldsb.common.core.collections.ListUtils;
import com.ld.shieldsb.common.core.model.Result;
import com.ld.shieldsb.common.core.util.ResultUtil;
import com.ld.shieldsb.common.core.util.StringUtils;

import lombok.extern.slf4j.Slf4j;

/**
 * 关系型数据库处理器
 * 
 * @ClassName LoggerHandler
 * @author <a href="mailto:donggongai@126.com" target="_blank">吕凯</a>
 * @date 2021年11月13日 下午4:21:18
 *
 */
@Slf4j
public class Data2RDbHandler implements ICanalDataSyncHandler {
    private RdbSyncService rdbSyncService;
    private DruidDataSource dataSource; // 数据库

    private Map<String, Map<String, MappingConfig>> mappingConfigCache; // 库表映射缓存

    public static final String TYPE_NAME = "rdb";
    private boolean inited = false; // 初始化字段标识
    private String key; // 唯一标识

    public static final String PROPERTIES_DESP = "jdbc.url：数据库连接url;\n jdbc.username：数据库的用户名;\n jdbc.password：数据库的密码;\nthreads:线程数;"; // OuterAdapterConfig中properties参数说明，必须有

    /**
     * 初始化
     * 
     * @Title init
     * @author 吕凯
     * @date 2021年12月2日 上午11:36:40
     * @param configuration
     * @see com.ld.ICanalDataSyncHandler.canal.common.util.handler.ICanalDataHandler#init(com.ld.sc.canal.common.util.handler.config.OuterAdapterConfig)
     */
    @Override
    public void init(OuterAdapterConfig configuration) {
        key = configuration.getKey();
        if (log.isWarnEnabled()) {
            log.warn("RDB关系型数据库处理器[key:{}]初始化", configuration.getKey());
        }
        // 初始化连接池
        Map<String, String> properties = configuration.getProperties();
        dataSource = new DruidDataSource();
//        dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
        dataSource.setUrl(properties.get("jdbc.url"));
        dataSource.setUsername(properties.get("jdbc.username"));
        dataSource.setPassword(properties.get("jdbc.password"));
        dataSource.setInitialSize(1); // 配置获取连接等待超时的时间
        dataSource.setMinIdle(1); // 配置获取连接等待超时的时间
        dataSource.setMaxActive(30); // 配置获取连接等待超时的时间
        dataSource.setMaxWait(60000); // 配置获取连接等待超时的时间，在此时间内获取不到会一直尝试
        dataSource.setTimeBetweenEvictionRunsMillis(60000); // 配置间隔多久才进行一次检测，检测需要关闭的空闲连接，单位是毫秒
        dataSource.setMinEvictableIdleTimeMillis(300000); // 配置一个连接在池中最小生存的时间，单位是毫秒
        dataSource.setUseUnfairLock(true);
        dataSource.setConnectionErrorRetryAttempts(5); // 连接出错时重试次数
        dataSource.setBreakAfterAcquireFailure(true); // 连接出错时重试N次后是否退出，需要退出的可以通过exitenProperties配置
        dataSource.setTimeBetweenConnectErrorMillis(300000); // 连接出错后重试时间间隔,5分钟

        // List<String> array = new ArrayList<>();
        // array.add("set names utf8mb4;");
        // dataSource.setConnectionInitSqls(array);

        try {
            dataSource.init();
        } catch (SQLException e) {
            log.error("ERROR ## failed to initial datasource: " + properties.get("jdbc.url"), e);
        }

        mappingConfigCache = new ConcurrentHashMap<>(); // 库名-表名对应配置

        List<AdapterConfig> configs = configuration.getConfigs();
        if (ListUtils.isNotEmpty(configs)) {
            for (AdapterConfig config : configs) {
                if (config instanceof MappingConfig) { // 类型判断

                    MappingConfig mc = (MappingConfig) config;
                    String sourceDatabase = mc.getDbMapping().getDatabase();
                    String sourceTable = mc.getDbMapping().getTable();

                    mappingConfigCache.put(mc.getDestination() + "_" + sourceDatabase + "-" + sourceTable, ImmutableMap.of("rdb", mc));
                }
            }
        }

        String threads = properties.get("threads"); // 线程数
        rdbSyncService = new RdbSyncService(dataSource, threads != null ? Integer.valueOf(threads) : null, true);
        inited = true; // 标记已经初始化
    }

    @Override
    public void sync(List<Dml> dmls, Recorder recorder) {
        if (dmls == null || dmls.isEmpty()) {
            return;
        }
        try {
            rdbSyncService.sync(mappingConfigCache, dmls, recorder, key);
        } catch (Exception e) {
            log.error("[" + key + "]数据库存储数据处理失败！！！", e);
        }
    }

    /**
     * ETL方法
     *
     * @param task
     *            任务名, 对应配置名,任务名对应配置文件名 mytest_user.yml
     * @param params
     *            etl筛选条件
     * @return ETL结果
     */
    @Override
    public Result etl(String destination, String database, String table, String targetDb, String targetTable, EtlConditions etlConditions,
            Consumer<EtlConsumer> con) {
        Result etlResult = new Result();
        MappingConfig config = null;

        Map<String, MappingConfig> configMap = mappingConfigCache.get(destination + "_" + database + "-" + table);

        if (configMap == null || configMap.values().isEmpty()) {
            return ResultUtil.error("配置参数为空！");
        }
        String etlCondition = etlConditions.getEtlCondition();
        String etlDelta = etlConditions.getEtlDelta();
        List<String> params = etlConditions.getParams();
        for (MappingConfig configM : configMap.values()) {
            DbMapping dbMapping = configM.getDbMapping();
            String mappingTargetTable = (dbMapping.getTargetTable() == null ? "" : dbMapping.getTargetTable());
            String mappingTargetDb = (dbMapping.getTargetDb() == null ? "" : dbMapping.getTargetDb());
            if (mappingTargetDb.equalsIgnoreCase(targetDb) && mappingTargetTable.equalsIgnoreCase(targetTable)) {
                config = configM;
                // 覆盖etl的条件,参数为空说明不需要传参，那么就是没有附件条件，不修改
                if ((StringUtils.isNotEmpty(etlDelta) || StringUtils.isNotEmpty(etlCondition))) {
                    dbMapping.setEtlCondition(etlCondition);
                    dbMapping.setEtlDelta(etlDelta);
                }
            }
        }

        if (config != null) {
            if (StringUtils.isNotEmpty(etlConditions.getDataSourceKey())) {
                config.setDataSourceKey(etlConditions.getDataSourceKey());
            }
            RdbEtlService rdbEtlService = new RdbEtlService(dataSource, config);
            // 设置识别符号
            rdbEtlService.setUniqueKey(etlConditions.getUniqueKey());
            // 将etl防止到service中放到map中
            if (StringUtils.isNotEmpty(etlConditions.getUniqueKey())) {

                EtlServiceUtil.getServiceMap().put(etlConditions.getUniqueKey(), rdbEtlService);
            }
            return rdbEtlService.importData(params, etlConditions.getType(), con);
        }
        etlResult.setSuccess(false);
        etlResult.setErrorMessage("任务未找到！");
        return etlResult;
    }

    @Override
    public void destroy() {
        if (rdbSyncService != null) {
            rdbSyncService.close();
        }

        if (dataSource != null) {
            dataSource.close();
        }

    }

    @Override
    public String getType() {
        return TYPE_NAME;
    }

    @Override
    public boolean getInit() {
        return inited;
    }

    @Override
    public Result testConn() {
        return ResultUtil.success("");
    }

    @Override
    public String getKey() {
        return key;
    }

    /**
     * 同步单条的dml数据，用于单条数据处理失败的情况，手动同步
     * 
     * @Title syncSingle
     * @author 吕凯
     * @date 2022年1月12日 下午5:14:32
     * @param dml
     * @param dataIndex
     * @param targetDb
     * @param targetTable
     * @param recorder
     * @return
     * @see com.ld.shieldsb.canalclient.handler.ICanalDataSyncHandler#syncSingle(com.ld.shieldsb.canalclient.model.Dml, int,
     *      java.lang.String, java.lang.String, com.ld.shieldsb.canalclient.recoder.Recorder)
     */
    @Override
    public Result syncSingle(Dml dml, int dataIndex, String targetDb, String targetTable, Recorder recorder) {
        return rdbSyncService.syncSingle(mappingConfigCache, key, dml, dataIndex, targetDb, targetTable);

    }
}
